package com.duobaoyu.chatwebsocket.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.duobaoyu.chatwebsocket.constant.MqEventConstant;
import com.duobaoyu.chatwebsocket.constant.RedisConstant;
import com.duobaoyu.chatwebsocket.dto.*;
import com.duobaoyu.chatwebsocket.enums.SourceTypeEnum;
import com.duobaoyu.chatwebsocket.handler.ChatReadHandler;
import com.duobaoyu.chatwebsocket.handler.ChatWriteHandler;
import com.duobaoyu.chatwebsocket.netty.UserChannelRel;
import com.duobaoyu.chatwebsocket.util.DingTalkUtil;
import com.duobaoyu.chatwebsocket.util.JsonObjectUtil;
import com.duobaoyu.chatwebsocket.util.NetUtil;
import com.duobaoyu.chatwebsocket.util.SpringUtil;
import com.duobaoyu.chatwebsocket.util.UUIDUtil;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author chenchao
 */
@Component
@Slf4j
public class WeChatToEmpListener implements MessageListener {

    @Resource
    private ChatWriteHandler chatWriteHandler;

    @Resource
    private ChatReadHandler chatReadHandler;

    private static final Integer LIMIT = 3;

    @Override
    public Action consume(Message message, ConsumeContext context) {

        String bodyString = JSON.parse(message.getBody(), Feature.AutoCloseSource).toString();
        log.info("WeChatToEmpListener_consume_message:{},body:{}", message, bodyString);
        WebSocketMqDto webSocketMqDto = JSONObject.parseObject(message.getBody(), WebSocketMqDto.class);
        String event = webSocketMqDto.getEvent();
        ConcurrentHashMap<String, List<String>> empIdWithChannelList = UserChannelRel.empIdWithChannelList;
        List<String> channelIdList = empIdWithChannelList.get(RedisConstant.EMP_CHANNEL_RELATION.concat(webSocketMqDto.getEmpId()));
        log.warn("bodyString : {},channelIdList : {},saleId : {}",bodyString,JSON.toJSON(channelIdList),webSocketMqDto.getEmpId());
        WebSocketMessageDto<ChatContentDto> webSocketMessageDto = new WebSocketMessageDto<>();
        try {
            log.info("event_type : {}",event);
            switch (event) {
                //客户消息--顾问
                case MqEventConstant.USER_TO_EMP:
                    log.info("user_to_emp : {}",JSON.toJSON(webSocketMqDto));
                    weChatToEmp(webSocketMqDto, channelIdList, webSocketMessageDto);
                    break;
                case MqEventConstant.LABEL_MSG:
                    lableToEmp(webSocketMqDto, channelIdList, webSocketMessageDto);
                    break;
                //顾问消息处理结果
                case MqEventConstant.MESSAGE_RESULT:
                    log.info("MESSAGE_RESULT : {}",event);
                    WebSocketResultDto webSocketResultDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), WebSocketResultDto.class);
                    log.info("sale_message_result : {}",JSON.toJSON(webSocketResultDto));
                    String channel = webSocketMqDto.getCurrentChannelId();
                    chatWriteHandler.messageResult(webSocketResultDto, channel);
                    break;
                //需要重新登录
                case MqEventConstant.NEED_LOGIN:
                    // redis缓存方式
                    //channel emp  ip 在redis存储一份全局的，
                    // 大key  小key     value
                    //empId   ip       {channelId,channelId2,...}字符串
                    //删除顾问对应的所有的长连接信息
                    empIdWithChannelList.remove(RedisConstant.EMP_CHANNEL_RELATION.concat(webSocketMqDto.getEmpId()));
                    log.info("long_link_del_success");
                    if (!CollectionUtils.isEmpty(channelIdList)) {
                        for (String channelId : channelIdList) {
                            Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                            if (receiverChannel != null) {
                                chatWriteHandler.needLogin(receiverChannel);
                                chatReadHandler.removeCtx(receiverChannel);
                            }
                        }
                    }
                    break;
                //游客登录结果
                case MqEventConstant.USER_CONNECT_RESULT:
                    UserChannelMsgDto userChannelMsgDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), UserChannelMsgDto.class);
                    WebSocketResultDto userConnectResultDto = new WebSocketResultDto();
                    String userChannelId = userChannelMsgDto.getChannelId();
                    //如果token有效
                    if (userChannelMsgDto.getValid()) {
                        userConnectResultDto.setSuccess(true);
                        userConnectResultDto.setData(userChannelMsgDto);
                        // key:userId ; value:channel
                        if (UserChannelRel.userIdWithChannelList.containsKey(userChannelMsgDto.getUserId())) {
                            ArrayList<Channel> channels = UserChannelRel.userIdWithChannelList.get(userChannelMsgDto.getUserId());
                            channels.add(UserChannelRel.channelIdWithChannel.get(userChannelMsgDto.getChannelId()));
                        } else {
                            ArrayList<Channel> channelList = new ArrayList<>();
                            channelList.add(UserChannelRel.channelIdWithChannel.get(userChannelMsgDto.getChannelId()));
                            UserChannelRel.userIdWithChannelList.put(userChannelMsgDto.getUserId(), channelList);
                        }
                        UserChannelRel.channelWithUser.put(UserChannelRel.channelIdWithChannel.get(userChannelId), userChannelMsgDto);
                        //刷回登录成功信息
                        userConnectResultDto.setSuccess(true);
                        userConnectResultDto.setCode("200");
                        chatWriteHandler.userConnect(userChannelId, userConnectResultDto);
                    } else {
                        // token无效
                        userConnectResultDto.setSuccess(false);
                        userConnectResultDto.setCode("500");
                        userConnectResultDto.setMsg("token失效");
                        chatWriteHandler.userConnect(userChannelId, userConnectResultDto);
                    }
                    break;
                //顾问消息-->头条客户端
                case MqEventConstant.MESSAGE_TO_CLIENT_WEBSOCKET:
                    chatWriteHandler.toClient(webSocketMqDto);
                    break;
                //用户消息处理结果
                case MqEventConstant.CLIENT_MESSAGE_RESULT:
                    WebSocketResultDto webSocketUserResultDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), WebSocketResultDto.class);
                    String to = webSocketMqDto.getTo();
                    chatWriteHandler.toClientResult(webSocketUserResultDto,to);
                    break;
                //多端消息同步
                case MqEventConstant.SYNC_MSG:
                    //如果此时list不为空则进行同步操作
                    webSocketMessageDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), WebSocketMessageDto.class);
                    ChatContentDto chatContentDto = JsonObjectUtil.parseObject(JSON.toJSON(webSocketMessageDto.getData()), ChatContentDto.class);
                    log.warn("chatContentDto : {}",JSON.toJSON(chatContentDto));
                    String currentChannelId = "";
                    if(Strings.isNotBlank(chatContentDto.getCurrentChannelId())){
                        currentChannelId = chatContentDto.getCurrentChannelId();
                    }
                    if (!CollectionUtils.isEmpty(channelIdList)) {
                        webSocketMessageDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), WebSocketMessageDto.class);
                        log.info("SYNC_MSG_webSocketMessageDto is :{}", webSocketMessageDto);
                        for (String channelId : channelIdList) {
                            log.info("compare_channel : {}",channelId.equals(currentChannelId));
                            if(!channelId.equals(currentChannelId)){
                                chatWriteHandler.syncMsg(channelId, webSocketMessageDto);
                            }
                        }
                    }
                    break;
                // 顾问关闭接客开关事件
                case MqEventConstant.SALE_CLOSE_RECEPTION_SWITCH:
                    saleCloseReceptionSwitchToEmp(webSocketMqDto, channelIdList, webSocketMessageDto);
                    break;
                // 顾问打开接客开关事件
                case MqEventConstant.SALE_OPEN_RECEPTION_SWITCH:
                    saleOpenReceptionSwitchToEmp(webSocketMqDto, channelIdList, webSocketMessageDto);
                    break;
                default:
                    //发送除了聊天系统以外的消息
                    Object content = webSocketMqDto.getContent();
                    if (!CollectionUtils.isEmpty(channelIdList)) {
                        for (String channelId : channelIdList) {
                            Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                            if (!StringUtils.isEmpty(receiverChannel)) {
                                chatWriteHandler.otherMsg(content, channelId);
                            }
                        }
                    }
            }

            return Action.CommitMessage;
        } catch (Exception e) {
            DingTalkUtil dingTalkUtil = SpringUtil.getBean(DingTalkUtil.class);
            String uuid = UUIDUtil.getUUID();

            dingTalkUtil.notify("chatwebsocket 消费消息失败,msgId:" + message.getMsgID() + "key:" + message.getKey() + "\n 日志唯一id :" + uuid + "\n exception message:" + e.getMessage() + "\n body:" + bodyString);
            log.error("日志唯一id：" + uuid + "\nconsuming origin chatwebsocket message error: ", e);
            if (message.getReconsumeTimes() <= LIMIT) {
                return Action.ReconsumeLater;
            } else {
                log.error("consuming origin weChat message error 3 times message:{} ", message);
                return Action.CommitMessage;
            }
        }
    }

    private void lableToEmp(WebSocketMqDto webSocketMqDto, List<String> channelIdList, WebSocketMessageDto<ChatContentDto> webSocketMessageDto) {
        ChatContentDto chatContentDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), ChatContentDto.class);
        log.info("weChatToEmp chatContentDto {}", JSON.toJSONString(chatContentDto));
        Long msgOriginTime = chatContentDto.getMsgOriginTime();
        //整个消息流程的耗时
        long during = System.currentTimeMillis() - msgOriginTime;
        log.info("消息流程总耗时:{},msgId:{}", during, chatContentDto.getMsgId());
        //添加数据
        webSocketMessageDto.setSource(SourceTypeEnum.WE_CHAT.getCode());
        webSocketMessageDto.setMessageId(chatContentDto.getMsgId());
        webSocketMessageDto.setData(chatContentDto);
        webSocketMessageDto.setFrom(chatContentDto.getUserId().toString());
        webSocketMessageDto.setTo(chatContentDto.getSaleId().toString());

        if (!CollectionUtils.isEmpty(channelIdList)) {
            for (String channelId : channelIdList) {
                Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                if (!StringUtils.isEmpty(receiverChannel)) {
                    log.info("WeChatToEmpListener WE_CHAT_TO_EMP webSocketMessageDto: {}", JSON.toJSONString(webSocketMessageDto));
                    chatWriteHandler.labelMsgToEmp(webSocketMessageDto, channelId);
                }
            }
        }
    }

    /**
     * 客户消息发送到顾问的逻辑
     *
     * @param webSocketMqDto      WebSocketMqDto
     * @param channelIdList       List
     * @param webSocketMessageDto WebSocketMessageDto
     */
    private void weChatToEmp(WebSocketMqDto webSocketMqDto, List<String> channelIdList, WebSocketMessageDto<ChatContentDto> webSocketMessageDto) {

        ChatContentDto chatContentDto = JsonObjectUtil.parseObject(webSocketMqDto.getContent(), ChatContentDto.class);
        log.info("weChatToEmp chatContentDto {}", JSON.toJSONString(chatContentDto));
        Long msgOriginTime = chatContentDto.getMsgOriginTime();
        //整个消息流程的耗时
        long during = System.currentTimeMillis() - msgOriginTime;
        log.info("消息流程总耗时:{},msgId:{}", during, chatContentDto.getMsgId());
        //添加数据
        webSocketMessageDto.setSource(SourceTypeEnum.WE_CHAT.getCode());
        webSocketMessageDto.setMessageId(chatContentDto.getMsgId());
        webSocketMessageDto.setData(chatContentDto);
        webSocketMessageDto.setFrom(chatContentDto.getUserId().toString());
        webSocketMessageDto.setTo(chatContentDto.getSaleId().toString());

        if (!CollectionUtils.isEmpty(channelIdList)) {
            for (String channelId : channelIdList) {
                Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                if (!StringUtils.isEmpty(receiverChannel)) {
                    log.info("WeChatToEmpListener WE_CHAT_TO_EMP webSocketMessageDto: {}", JSON.toJSONString(webSocketMessageDto));
                    chatWriteHandler.weChatToEmp(webSocketMessageDto, channelId);
                }
            }

        }
    }

    /**
     * 顾问关闭接客开关事件
     *
     * @param webSocketMqDto      消息体
     * @param channelIdList       通道id集合
     * @param webSocketMessageDto 返回给前端的消息体
     * @author liuyintao
     */
    private void saleCloseReceptionSwitchToEmp(WebSocketMqDto webSocketMqDto, List<String> channelIdList, WebSocketMessageDto<ChatContentDto> webSocketMessageDto) {
        saleReceptionSwitchToEmp(webSocketMqDto, webSocketMessageDto);
        if (!CollectionUtils.isEmpty(channelIdList)) {
            for (String channelId : channelIdList) {
                Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                if (!StringUtils.isEmpty(receiverChannel)) {
                    log.info("WeChatToEmpListener SALE_CLOSE_RECEPTION_SWITCH webSocketMessageDto: {}", JSON.toJSONString(webSocketMessageDto));
                    chatWriteHandler.saleCloseReceptionSwitchToEmp(webSocketMessageDto, channelId);
                }
            }

        }
    }

    /**
     * 顾问打开接客开关事件
     *
     * @param webSocketMqDto      消息体
     * @param channelIdList       通道id集合
     * @param webSocketMessageDto 返回给前端的消息体
     * @author liuyintao
     */
    private void saleOpenReceptionSwitchToEmp(WebSocketMqDto webSocketMqDto, List<String> channelIdList, WebSocketMessageDto<ChatContentDto> webSocketMessageDto) {
        saleReceptionSwitchToEmp(webSocketMqDto, webSocketMessageDto);
        if (!CollectionUtils.isEmpty(channelIdList)) {
            for (String channelId : channelIdList) {
                Channel receiverChannel = UserChannelRel.channelIdWithChannel.get(channelId);
                if (!StringUtils.isEmpty(receiverChannel)) {
                    log.info("WeChatToEmpListener SALE_OPEN_RECEPTION_SWITCH webSocketMessageDto: {}", JSON.toJSONString(webSocketMessageDto));
                    chatWriteHandler.saleOpenReceptionSwitchToEmp(webSocketMessageDto, channelId);
                }
            }

        }
    }

    /**
     * 设置 webSocketMessageDto
     *
     * @param webSocketMqDto
     * @param webSocketMessageDto
     * @author liuyintao
     */
    private void saleReceptionSwitchToEmp(WebSocketMqDto webSocketMqDto, WebSocketMessageDto<ChatContentDto> webSocketMessageDto) {
        webSocketMessageDto.setMessageId(webSocketMqDto.getKey());
        webSocketMessageDto.setTo(webSocketMqDto.getEmpId());
    }

}
